package org.iot.spark.processor

import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import org.apache.log4j.Logger
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.streaming.{Seconds, State, StateSpec, Time}
import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream}
import org.iot.spark.enity.{POITrafficData, TotalTrafficData, WindowTrafficData}
import org.iot.spark.tools.HbaseTools
import org.iot.spark.util.{GeoDistanceCalculator, PropertFileReader}
import org.iot.spark.vo.{AggregateKey, IotData, POIData}

class IotTrafficDataProcessor {

}

object IotTrafficDataProcessor{

  private val logger = Logger.getLogger(classOf[IotTrafficDataProcessor])
  val prop:Properties = PropertFileReader.readPropertyFile()

  val stateSpecTotal = StateSpec.function((currentBatchTime: Time, key: AggregateKey, value: Option[Int], currentState: State[Long]) => { //统计一小时总数目
    val sum = value.getOrElse(0).toLong + currentState.getOption.getOrElse(0L)
    val total = (key,sum)
    currentState.update(sum)
    Some(total)
  }).timeout(Seconds(3600))

  //处理总的交通数据，并写入到hbase表中
  def processTotalTrafficData(iot_stream:DStream[IotData]):Unit ={

    //汇总同一路线同一车型的数量
    val RoutecountDstream:DStream[(AggregateKey, Int)] = iot_stream.map(iot => (new AggregateKey(iot.getRouteId,iot.getVehicleType),1)).reduceByKey((a,b)=>a+b)

    //统计一小时内的总数目
    val RouteTotalCountMapDstream:MapWithStateDStream[AggregateKey, Int, Long, (AggregateKey,Long)] = RoutecountDstream.mapWithState(stateSpecTotal)

    val RouteTotalCountDstream:DStream[(AggregateKey,Long)] = RouteTotalCountMapDstream.map(tuple => tuple)

    val totalTrafficDataSteram:DStream[TotalTrafficData] = RouteTotalCountDstream.map(dat => {
      logger.warn("Total Count:" + "key " + dat._1.getRouteId + "-----" + dat._1.getVehicleType  + "-----" + "val " + dat._2)
      new TotalTrafficData(dat._1.getRouteId,dat._1.getVehicleType,dat._2, new Date, new SimpleDateFormat("yyyy-MM-dd").format(new Date))
    })

    totalTrafficDataSteram.foreachRDD(eachRdd =>{   /* 往hbase写入数据 */
      eachRdd.foreachPartition(eachPartition =>{
        val conn = HbaseTools.getHbaseConn
        eachPartition.foreach(eachLine=>{
          HbaseTools.saveBusinessDatas(prop.getProperty("traffic.total.data"),eachLine,conn)
        })
        conn.close()
      })
    })
  }


  //处理窗口类型的交通数据，并写入到hbase表中
  def processWindowTrafficData(iot_stream:DStream[IotData]):Unit ={

    //滑动窗口每过10秒统计30秒内同一路线同一车型的数量
    val windowStream:DStream[(AggregateKey, Int)] = iot_stream.map(iot => (new AggregateKey(iot.getRouteId,iot.getVehicleType),1)).reduceByKeyAndWindow((x:Int,y:Int)=>x+y, Seconds(30), Seconds(10))

    val windowTrafficDataSteram:DStream[WindowTrafficData] = windowStream.map(dat =>{
      logger.warn("window Count:" + "key " + dat._1.getRouteId + "-----" + dat._1.getVehicleType  + "-----" + "val " + dat._2)
      new WindowTrafficData(dat._1.getRouteId,dat._1.getVehicleType,dat._2, new Date, new SimpleDateFormat("yyyy-MM-dd").format(new Date))
    })

    windowTrafficDataSteram.foreachRDD(eachRdd =>{
      eachRdd.foreachPartition(eachPartition =>{
        val conn = HbaseTools.getHbaseConn
        eachPartition.foreach(eachLine=>{
          HbaseTools.saveBusinessDatas(prop.getProperty("traffic.window.data"),eachLine,conn)
        })
        conn.close()
      })
    })
  }

  //找出符合规则的交通数据
  def processPOIData(iot_stream:DStream[IotData],poi_dat:Broadcast[(POIData,String,String)]):Unit ={
    val iotDataStreamFilter = iot_stream.filter(iot => (iot.getRouteId.equals(poi_dat.value._2)) //过滤出符合条件的数据
      && iot.getVehicleType.equals(poi_dat.value._3)
      && GeoDistanceCalculator.isInPOIRadius(iot.getLatitude.toDouble,iot.getlongitude().toDouble,
        poi_dat.value._1.latitude,poi_dat.value._1.longitude,poi_dat.value._1.radius)
    )

    val poiStream = iotDataStreamFilter.map(iot => (iot,poi_dat.value._1))
    val trafficDstream = poiStream.map(dat =>{
      logger.warn("poi data:" + "key " + dat._1.getRouteId + "-" + dat._1.getVehicleType  + "val " + dat._2)
      val distance = GeoDistanceCalculator.GetDistance(dat._1.getLatitude.toDouble,dat._1.getlongitude().toDouble,dat._2.latitude,dat._2.longitude)
      logger.warn("Distance for " + dat._1.getLatitude + "," + dat._1.getlongitude() + "," + dat._2.latitude + "," + dat._2.longitude + " = " + distance)
      new POITrafficData(dat._1.getVehicleId,distance,dat._1.getVehicleType,new Date)
    })

    trafficDstream.foreachRDD(eachRdd =>{
      eachRdd.foreachPartition(eachPartition =>{
        val conn = HbaseTools.getHbaseConn
        eachPartition.foreach(eachLine=>{
          HbaseTools.saveBusinessDatas(prop.getProperty("traffic.poi.data"),eachLine,conn)
        })
        conn.close()
      })
    })
  }
}
